#include "remm_async.h"

lsv_remm_block * remm_iterm_block_malloc(lsv_volume_proto_t *lsv_volume_proto){
	lsv_s32_t err = 0;
	lsv_u32_t chunk_id;
	lsv_s8_t *mem_block = NULL;
	lsv_remm_block *remm_block = NULL;
	lsv_u32_t chunk_ids[1024];

	lsv_remm_info * remm_info = (lsv_remm_info *)lsv_volume_proto->remm_info;
	remm_block = (lsv_remm_block *)malloc(sizeof(lsv_remm_block));
	if(NULL == remm_block){
		DERROR("malloc for remm_block failed\n");
		err = -ENOMEM;
		goto EXIT;
	}
	memset(remm_block, 0, sizeof(lsv_remm_block));
	
	mem_block = (lsv_s8_t *)malloc(LSV_CHUNK_SIZE);
	if(NULL == mem_block){
		DERROR("malloc for mem_block failed\n");
		err = -ENOMEM;
		goto EXIT;
	}
	memset(mem_block, 0, LSV_CHUNK_SIZE);
	remm_block->chunk_buf = mem_block;

	err = lsv_volume_chunk_malloc(lsv_volume_proto, LSV_WBUFFER_STORAGE_TYPE, &chunk_id);
	if(err){
		DERROR("call lsv_volume_chunk_malloc failed\n");
		err = -1;
		goto EXIT;
	}
	err = lsv_volume_chunk_update(lsv_volume_proto, chunk_id, mem_block, 0, LSV_CHUNK_SIZE);
	if(err){
		DERROR("call lsv_volume_chunk_update failed\n");
		err = -1;
		goto EXIT;
	}
	remm_block->chunk_id = chunk_id;

	remm_block->iterm_num = 0;
	remm_block->iterm_cur = 0;
	memset(remm_block->dirty_bitmap, 0, LSV_CHUNK_SIZE / LSV_PAGE_SIZE / 8);
	list_add_tail(&remm_block->list, &remm_info->remm_list);

	memset(chunk_ids, 0, 1024 * sizeof(lsv_u32_t));
/*	list_for_each_safe(){
		i ++;
		chunk_ids[i] = ;
	}	
	err = 
*/
	return remm_block;
EXIT:
	if(NULL != remm_block){
		free(remm_block);
	}
	if(NULL != mem_block){
		free(mem_block);
	}
	return NULL;
}

lsv_s32_t remm_data_send_async(lsv_volume_proto_t *lsv_volume_proto, lsv_s8_t * buf, lsv_u64_t lba, lsv_s32_t size){
	lsv_remm_iterm iterm;
	lsv_s32_t err = 0;
	lsv_s32_t page_idx = (lba % LSV_CHUNK_SIZE) / LSV_PAGE_SIZE;
	lsv_remm_block *remm_block = NULL;
	struct list_head * index = NULL;
	lsv_remm_info * remm_info = (lsv_remm_info *)lsv_volume_proto->remm_info;
	iterm.op_code = LSV_OP_TYPE_DATA|LSV_OP_CODE_WRITE;
	iterm.size = size;
	iterm.lba = lba;
	
	lsv_wrlock(&list_rwlock);	
	if(list_empty(&remm_info->remm_list)){
		remm_block = remm_iterm_block_malloc(lsv_volume_proto);
	}else{
		list_for_each_prev(index, &remm_info->remm_list){
			remm_block = (lsv_remm_block*)index;
			break;
		}
		
		if(remm_block->iterm_num == LSV_CHUNK_SIZE / sizeof(lsv_remm_iterm)){
			remm_block = remm_iterm_block_malloc(lsv_volume_proto);	
		}
	}

	memcpy(remm_block->chunk_buf + remm_block->iterm_num * sizeof(lsv_remm_iterm), &iterm, sizeof(lsv_remm_iterm));
	remm_block->iterm_num ++;
	
	remm_block->dirty_bitmap[page_idx/8] |= (1 << (8 - 1 - page_idx % 8));
	lsv_rwunlock(&list_rwlock);

	return err;
}

lsv_s32_t remm_truncate_send_async(lsv_volume_proto_t *lsv_volume_proto, lsv_s32_t size){
	lsv_remm_iterm iterm;
	lsv_s32_t err = 0;
	lsv_s32_t page_idx = 0;
	lsv_remm_block *remm_block = NULL;
	struct list_head * index = NULL;

	lsv_remm_info * remm_info = (lsv_remm_info *)lsv_volume_proto->remm_info;
	iterm.op_code = LSV_OP_TYPE_MSG|LSV_OP_CODE_TRUNCATE;
	iterm.size = size;
	
	lsv_wrlock(&list_rwlock);	
	if(list_empty(&remm_info->remm_list)){
		remm_block = remm_iterm_block_malloc(lsv_volume_proto);
	}else{
		list_for_each_prev(index, &remm_info->remm_list){
			remm_block = (lsv_remm_block*)index;
			break;
		}
		if(remm_block->iterm_num == LSV_CHUNK_SIZE / sizeof(lsv_remm_iterm)){
			remm_block = remm_iterm_block_malloc(lsv_volume_proto);
		}
	}
	memcpy(remm_block->chunk_buf + remm_block->iterm_num * sizeof(lsv_remm_iterm), &iterm, sizeof(lsv_remm_iterm));
	page_idx = remm_block->iterm_num ++;
	remm_block->dirty_bitmap[page_idx / 8] |= (1 << (8 - 1 - page_idx % 8));
	lsv_rwunlock(&list_rwlock);
	
	return err;
}

lsv_s32_t remm_batch_send_by_remm(lsv_volume_proto_t *lsv_volume_proto){
	lsv_remm_iterm *iterm = NULL;
	lsv_s8_t * buf = NULL;
	lsv_s32_t err = 0;
	lsv_s32_t i = 0;
	lsv_remm_block *remm_block = NULL;
	struct list_head *index = NULL;
	volume_proto_t *volume_proto = NULL;
	io_t io;
	buffer_t commit_buf;
	
	lsv_remm_info * remm_info = (lsv_remm_info *)lsv_volume_proto->remm_info;
	lsv_wrlock(&list_rwlock);

	list_for_each(index, &remm_info->remm_list){
		remm_block = (lsv_remm_block*)index;
		if(remm_block->iterm_num == remm_block->iterm_cur){
			break;
		}else if(remm_block->iterm_num < LSV_CHUNK_SIZE / sizeof(lsv_remm_iterm) / 2){
		
		}
		list_del_init(index);
	}

	lsv_rwunlock(&list_rwlock);
	if(NULL == remm_block){
		return 0;
	}

	i = remm_block->iterm_cur;
	for(i = remm_block->iterm_cur; i < remm_block->iterm_num; i++){
		iterm =  (lsv_remm_iterm *)(remm_block->chunk_buf + i * sizeof(lsv_remm_iterm));
		if((LSV_OP_TYPE_MSG | LSV_OP_CODE_TRUNCATE) == iterm->op_code){
			remm_truncate_send_sync(lsv_volume_proto, iterm->size);
		}else if((LSV_OP_TYPE_DATA | LSV_OP_CODE_WRITE) == iterm->op_code){
			buf = (lsv_s8_t *)malloc(iterm->size);
			if(NULL == buf){
				err = -ENOMEM;
				break;
			}
			volume_proto = lsv_volume_proto->volume_proto;
			io_init(&io, &volume_proto->chkid, NULL, iterm->lba, iterm->size, 0);	
			err = volume_proto_read_raw(volume_proto, &io, &commit_buf);
			if(err != iterm->size){
				DERROR("call lsv_read load offset: %lu failed\n", iterm->lba);
				break;
			}
			mbuffer_get(&commit_buf, buf, iterm->size);
			remm_data_send_sync(lsv_volume_proto, buf, iterm->lba, iterm->size);
			mbuffer_free(&commit_buf);
		}else{
			DERROR("OP_CODE: %d not implemented\n", iterm->op_code);
		}
		
		remm_block->iterm_cur ++;
	}

	if(i < remm_block->iterm_num){
		lsv_wrlock(&list_rwlock);
		list_add(index, &remm_info->remm_list);
		lsv_rwunlock(&list_rwlock);
	}
	

	return err;
}
